package com.hzh.flink.source

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._

object Demo5KafkaSource {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    /**
     * 构建kafka source
     *
     */
    val source: KafkaSource[String] = KafkaSource
      .builder[String]
      .setBootstrapServers("master:9092,node1:9092,node2:9092") //kafka集群broker列表
      .setTopics("test_topic2") //指定topic
      .setGroupId("my-group") //指定消费者组，一条数据在一个组内只被消费一次
      .setStartingOffsets(OffsetsInitializer.earliest()) //读取数据的位置，earliest：读取所有的数据，latest：读取最新的数据
      .setValueOnlyDeserializer(new SimpleStringSchema()) //反序列的类
      .build

    //使用kafka source
    val kafkaDS: DataStream[String] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")

    kafkaDS.print()

    env.execute()

  }
}

